Python InfluxDBClient.write 您所在的位置:网站首页 influxdbclient influx语法 Python InfluxDBClient.write

Python InfluxDBClient.write

2024-01-17 11:46| 来源: 网络整理| 查看: 265

本文整理汇总了Python中influxdb.InfluxDBClient.write_points方法的典型用法代码示例。如果您正苦于以下问题:Python InfluxDBClient.write_points方法的具体用法?Python InfluxDBClient.write_points怎么用?Python InfluxDBClient.write_points使用的例子?那么恭喜您, 这里精选的方法代码示例或许可以为您提供帮助。您也可以进一步了解该方法所在influxdb.InfluxDBClient的用法示例。

在下文中一共展示了InfluxDBClient.write_points方法的15个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于系统推荐出更棒的Python代码示例。

示例1: log_to_influx # 需要导入模块: from influxdb import InfluxDBClient [as 别名] # 或者: from influxdb.InfluxDBClient import write_points [as 别名] def log_to_influx(zone_details): influx_client = InfluxDBClient(influx_host, influx_port, influx_user, influx_password, db_name) data = [] ts = datetime.utcnow() ts = ts.replace(microsecond=0) for zone in zone_details: temp_actual = zone['temp'] temp_target = zone['setpoint'] zone_name = zone['name'] record_actual, record_target, record_delta = prep_record(ts, zone_name, temp_actual, temp_target) if record_actual: data.append(record_actual) if record_target: data.append(record_target) if record_delta: data.append(record_delta) print "%s : %s (%s, %s)" % (ts, zone_name, temp_actual, temp_target) try: if write_to_influx: influx_client.write_points(data) except InfluxDBClientError as e: print e开发者ID:andrew-blake,项目名称:evohome-utils,代码行数:32,代码来源:influx_utils.py 示例2: execute # 需要导入模块: from influxdb import InfluxDBClient [as 别名] # 或者: from influxdb.InfluxDBClient import write_points [as 别名] def execute(db): print("THIS IS DB: {}\n".format(db)) t0 = time.time() count, v = CreateBatch() t1 = time.time() reading = t1-t0 rate1 = float(count)/reading print("TIME READING: {} SPEED PER REQUEST: {}\n".format(reading,rate1)) if db == '1': print("ENTERED 1") client = InfluxDBClient(host='127.0.0.1', port=8086, database='newDB') t2 = time.time() client.write_points(v) t3 = time.time() influx = t3-t2 rate2 = float(count)/influx print("TIME INFLUX: {} SPEED PER REQUEST: {}\n\n".format(influx,rate2)) if db == '2': print("ENTERED TWO") client = InfluxDBClient(host='127.0.0.1', port=8086, database='DB2') t2 = time.time() client.write_points(v) t3 = time.time() influx = t3-t2 rate2 = float(count)/influx print("TIME INFLUX: {} SPEED PER REQUEST: {}\n\n".format(influx,rate2))开发者ID:BU-NU-CLOUD-SP16,项目名称:Network-traffic-collection,代码行数:29,代码来源:readpcap_v2.py 示例3: test_write_points_batch # 需要导入模块: from influxdb import InfluxDBClient [as 别名] # 或者: from influxdb.InfluxDBClient import write_points [as 别名] def test_write_points_batch(self): dummy_points = [ {"measurement": "cpu_usage", "tags": {"unit": "percent"}, "time": "2009-11-10T23:00:00Z", "fields": {"value": 12.34}}, {"measurement": "network", "tags": {"direction": "in"}, "time": "2009-11-10T23:00:00Z", "fields": {"value": 123.00}}, {"measurement": "network", "tags": {"direction": "out"}, "time": "2009-11-10T23:00:00Z", "fields": {"value": 12.00}} ] expected_last_body = ( "network,direction=out,host=server01,region=us-west " "value=12.0 1257894000000000000\n" ) with requests_mock.Mocker() as m: m.register_uri(requests_mock.POST, "http://localhost:8086/write", status_code=204) cli = InfluxDBClient(database='db') cli.write_points(points=dummy_points, database='db', tags={"host": "server01", "region": "us-west"}, batch_size=2) self.assertEqual(m.call_count, 2) self.assertEqual(expected_last_body, m.last_request.body.decode('utf-8'))开发者ID:simudream,项目名称:influxdb-python,代码行数:29,代码来源:client_test.py 示例4: send_to_influx # 需要导入模块: from influxdb import InfluxDBClient [as 别名] # 或者: from influxdb.InfluxDBClient import write_points [as 别名] def send_to_influx(self): if not self.config['hostname']: self.log.error("No Influx server configured, please set one using: " "ceph influx config-set hostname ") self.set_health_checks({ 'MGR_INFLUX_NO_SERVER': { 'severity': 'warning', 'summary': 'No InfluxDB server configured', 'detail': ['Configuration option hostname not set'] } }) return # If influx server has authentication turned off then # missing username/password is valid. self.log.debug("Sending data to Influx host: %s", self.config['hostname']) client = InfluxDBClient(self.config['hostname'], self.config['port'], self.config['username'], self.config['password'], self.config['database'], self.config['ssl'], self.config['verify_ssl']) # using influx client get_list_database requires admin privs, # instead we'll catch the not found exception and inform the user if # db can not be created try: client.write_points(self.get_df_stats(), 'ms') client.write_points(self.get_daemon_stats(), 'ms') self.set_health_checks(dict()) except ConnectionError as e: self.log.exception("Failed to connect to Influx host %s:%d", self.config['hostname'], self.config['port']) self.set_health_checks({ 'MGR_INFLUX_SEND_FAILED': { 'severity': 'warning', 'summary': 'Failed to send data to InfluxDB server at %s:%d' ' due to an connection error' % (self.config['hostname'], self.config['port']), 'detail': [str(e)] } }) except InfluxDBClientError as e: if e.code == 404: self.log.info("Database '%s' not found, trying to create " "(requires admin privs). You can also create " "manually and grant write privs to user " "'%s'", self.config['database'], self.config['username']) client.create_database(self.config['database']) else: self.set_health_checks({ 'MGR_INFLUX_SEND_FAILED': { 'severity': 'warning', 'summary': 'Failed to send data to InfluxDB', 'detail': [str(e)] } }) raise开发者ID:bspark8,项目名称:ceph,代码行数:62,代码来源:module.py 示例5: test_request_retry_raises # 需要导入模块: from influxdb import InfluxDBClient [as 别名] # 或者: from influxdb.InfluxDBClient import write_points [as 别名] def test_request_retry_raises(self, mock_request): """Test that three requests errors will not be handled.""" class CustomMock(object): """Create custom mock object for test.""" def __init__(self): self.i = 0 def connection_error(self, *args, **kwargs): """Handle a connection error for the CustomMock object.""" self.i += 1 if self.i < 4: raise requests.exceptions.HTTPError else: r = requests.Response() r.status_code = 200 return r mock_request.side_effect = CustomMock().connection_error cli = InfluxDBClient(database='db') with self.assertRaises(requests.exceptions.HTTPError): cli.write_points(self.dummy_points)开发者ID:bbc,项目名称:influxdb-python,代码行数:27,代码来源:client_test.py 示例6: main # 需要导入模块: from influxdb import InfluxDBClient [as 别名] # 或者: from influxdb.InfluxDBClient import write_points [as 别名] def main(): print 'please enter host ip:' host = raw_input() print 'please enter port:' port = raw_input() print 'please enter user:' user = raw_input() print 'please enter password:' pwd = getpass.getpass() print 'please enter database:' db = raw_input() _db_client = InfluxDBClient(host, port, username=user, password=pwd, database=db, ssl=False, verify_ssl=False, timeout=5) print '------------------\n------------------' print 'please enter date in the format: yyyy-mm-dd:' date = raw_input() print 'please enter utc(! 2 hours before summertime !) time in the format: hh-mm-ss:' time = raw_input() print 'please enter event caption:' caption = raw_input() out = [{'measurement': 'event', 'time' : date + 'T' + time + '.000000Z', 'fields' : {'text' : caption}}] # time MUST be UTC! (2 hours less than summertime) try: print time.ctime() + ': send data...' , print _db_client.write_points(out) except (requests.exceptions.ReadTimeout, requests.exceptions.ConnectionError, InfluxDBClientError, InfluxDBServerError) as e: print e print time.ctime() + ": Error while sending data..."开发者ID:ma-tri-x,项目名称:ESpy,代码行数:33,代码来源:writeInfluxEvent.py 示例7: main # 需要导入模块: from influxdb import InfluxDBClient [as 别名] # 或者: from influxdb.InfluxDBClient import write_points [as 别名] def main(): """ Main demo program. Takes samples of computer metrics and stores them into InfluxDB. """ influxClient = InfluxDBClient( host=HOST, port=PORT, username=USERNAME, password=PASSWORD, ssl=USE_SSL ) setupDatabase(influxClient) for i in xrange(SAMPLE_COUNT): print "Collection sample of your computer metrics..." if i % 10 == 0: print "\t({} samples remaining before I quit)".format(SAMPLE_COUNT - i) # Sample computer for metrics. sample = getSample() # Create an object to write to InfluxDB containing all sample measurements. payload = createInfluxPayload(sample) # Write all measurements. influxClient.write_points(payload) # Wait. time.sleep(SAMPLE_INTERVAL_SECONDS)开发者ID:it-man-cn,项目名称:influx-demo,代码行数:33,代码来源:collect_samples.py 示例8: main # 需要导入模块: from influxdb import InfluxDBClient [as 别名] # 或者: from influxdb.InfluxDBClient import write_points [as 别名] def main(val, measurename, host='localhost', port='8086'): """Instantiate a connection to the InfluxDB.""" user = 'root' password = 'root' dbuser = 'smly' dbuser_password = 'my_secret_password' query = 'select value from cpu_load_short;' dbname = 'kang_test' client = InfluxDBClient(host, port, None, None, dbname) query = 'show measurements on %s;' %dbname print("Querying data: " + query) result = client.query(query) print("Result: {0}".format(result)) write_json[0]['measurement'] = measurename write_json[0]['fields']['dust_val'] = val write_json[0]['time'] = getUTCnow() print("Write points: {0}".format(write_json)) client.write_points(write_json) query = 'select * from %s order by desc limit 1;' %measurename print("Querying data: " + query) ''' write한 데이터가 제대로 입력되었는지 확인 ''' result = client.query(query) print("Result: {0}".format(result))开发者ID:jeonghoonkang,项目名称:BerePi,代码行数:29,代码来源:influx_simple_driver.py 示例9: write # 需要导入模块: from influxdb import InfluxDBClient [as 别名] # 或者: from influxdb.InfluxDBClient import write_points [as 别名] def write(self, repository, counters, current_time=int(time.time())): client = InfluxDBClient(self.host, self.port, self.user, self.password, None, True, False) print client.get_list_database() if {"name": self.database} not in client.get_list_database(): client.create_database(self.database) client.switch_database(self.database) ts = time.time() st = datetime.datetime.utcfromtimestamp(ts).strftime('%Y-%m-%dT%H:%M:%SZ') json_body = [{ "measurement": "time", "tags": { "host": "cvm-perf01", "region": repository, }, "time": st, "fields": { "value": current_time } }] client.write_points(json_body) for counter in counters.values(): json_body[0]["measurement"] = counter.name json_body[0]["fields"]["value"] = int(counter.avg()) client.write_points(json_body)开发者ID:cvmfs,项目名称:cvmfs,代码行数:28,代码来源:statistics_collector.py 示例10: InfluxDB # 需要导入模块: from influxdb import InfluxDBClient [as 别名] # 或者: from influxdb.InfluxDBClient import write_points [as 别名] class InfluxDB(object): """Implements a InfluxDB client handler""" def __init__(self, name, config, *args, **kwargs): logger.debug('Starting InfluxDB client') self.client = InfluxDBClient(**config.get('client')) self.name = name def write(self, point): """Write to InfluxDB point is a tuple that contain (DATE, VALUE) """ host = "{0}-{1}".format(self.name, socket.gethostname()) json_body = [{ "measurement": self.name, "tags": { "host": host, }, "time": point[0], "fields": { "value": point[1] }}] logger.debug('Write on InfluxDB: {0}. point: {1}'.format( host, point)) self.client.write_points(json_body)开发者ID:WeLikeAlpacas,项目名称:Qpaca,代码行数:30,代码来源:influx.py 示例11: do_GET # 需要导入模块: from influxdb import InfluxDBClient [as 别名] # 或者: from influxdb.InfluxDBClient import write_points [as 别名] def do_GET(self): # Part 1: Get the correct GET request from jeedom try: parsed_url = urllib.parse.urlparse(self.path) query = urllib.parse.parse_qs(parsed_url.query) # Extract the value, the name and the location + add current time val = float(query["val"][0]) name = query["name"][0] name = name.encode("latin-1").decode("utf-8") location = query["location"][0] act_time = time.time() * 1000000000 except: print("URL Parsing error: ", sys.exc_info()[0]) self.send_response(400) self.send_header("Content-type", "text/html") self.end_headers() return # Part 2: Write Data to InfluxDB self.send_response(200) self.send_header("Content-type", "text/html") self.end_headers() client = InfluxDBClient( INLUXDB_SERVER_IP, INLUXDB_SERVER_PORT, INFLUXDB_USERNAME, INFLUXDB_PASSWORD, INFLUXDB_DB_NAME ) # Build JSON data req = [{"measurement": name, "tags": {"lieu": location}, "time": int(act_time), "fields": {"value": val}}] client.write_points(req) return开发者ID:neuhausj,项目名称:JeedomTools,代码行数:33,代码来源:Bridge_Jeedom_InfluxDB.py 示例12: store_data # 需要导入模块: from influxdb import InfluxDBClient [as 别名] # 或者: from influxdb.InfluxDBClient import write_points [as 别名] def store_data(xml_data=None): root = ET.fromstring(xml_data) ipe_id = root.find('./*[@name="ipeId"]').attrib['val'] app_id = root.find('./*[@name="appId"]').attrib['val'] category = root.find('./*[@name="category"]').attrib['val'] data = int(root.find('./*[@name="data"]').attrib['val']) unit = root.find('./*[@name="unit"]').attrib['val'] json_body = [ { "measurement": "sensor_status", "tags": { "sensor_id": app_id, "ipe_id": ipe_id, "category": category }, "fields": { "data": data, "unit": unit } } ] influxdb_host = 'localhost' if os.environ.get('INFLUXDB_HOST_NAME'): influxdb_host = os.environ['INFLUXDB_HOST_NAME'] client = InfluxDBClient(influxdb_host, os.environ.get('INFLUXDB_PORT'), 'root', 'root', 'oneM2M') client.write_points(json_body)开发者ID:cloudcomputinghust,项目名称:IoT,代码行数:28,代码来源:influxdb_client.py 示例13: FillInfluxDBLoaderBolt # 需要导入模块: from influxdb import InfluxDBClient [as 别名] # 或者: from influxdb.InfluxDBClient import write_points [as 别名] class FillInfluxDBLoaderBolt(Bolt): # TODO use passed in options # This spout may need to throttle while doing bulk inserts def initialize(self, storm_conf, context): #TODO debug storm_conf, context self.client = InfluxDBClient("streamparse-box", 8086, "root", "root", "prices_development") def process(self, tup): print(tup) fill = Fill(*tup.values) # tags are dimensions, fields are facts payload = { "measurement": "fills", "tags": { "taxpayer_id": str(fill.taxpayer_id), "portfolio_order_id": str(fill.portfolio_order_id), "component_order_id": str(fill.component_order_id), "market_order_id": str(fill.market_order_id), "execution_id": str(fill.execution_id), "fill_id": str(fill.fill_id), "symbol": str(fill.symbol) }, "fields": { "filled_amount": fill.filled_amount, "filled_price": fill.filled_price, "filled_shares": fill.filled_shares, "delta_shares_position": fill.delta_shares_position, "delta_cash_position": fill.delta_cash_position }, "timestamp": fill.timestamp } self.client.write_points([payload])开发者ID:ygoldman,项目名称:fintank,代码行数:35,代码来源:fills.py 示例14: InfluxDatabase # 需要导入模块: from influxdb import InfluxDBClient [as 别名] # 或者: from influxdb.InfluxDBClient import write_points [as 别名] class InfluxDatabase(Database): def __init__(self, props, database): super().__init__(database) host = props.get('host', 'localhost') port = props.get('port', 8086) self.client = InfluxDBClient(host=host, port=port) self.client.create_database(self.database) self.client.switch_database(self.database) def now(self): return super().now() def _write_all_points(self, points, align_points=False): if align_points: time = self.now() for p in points: p['time'] = time self.client.write_points(points) def _point(self, name, fields, tags={}): return { 'measurement': name, 'time': self.now(), 'tags': tags, 'fields': fields } def time_format(self): return '%Y-%m-%dT%H:%M:%SZ'开发者ID:spinnaker,项目名称:spinnaker,代码行数:34,代码来源:influx_database.py 示例15: write_influxdb_list # 需要导入模块: from influxdb import InfluxDBClient [as 别名] # 或者: from influxdb.InfluxDBClient import write_points [as 别名] def write_influxdb_list(logger, host, port, user, password, dbname, data): """ Write an entry into an Influxdb database example: write_influxdb('localhost', 8086, 'mycodo', 'password123', 'mycodo_db', data_list_of_dictionaries) :return: success (0) or failure (1) :rtype: bool :param host: What influxdb address :type host: str :param port: What influxdb port :type port: int :param user: What user to connect to influxdb with :type user: str :param password: What password to supply for Influxdb user :type password: str :param dbname: What Influxdb database name to write to :type dbname: str :param data_list_of_dictionaries: The data being entered into the Influxdb database. See controller_sensor.py function addMeasurementInfluxdb() :type data_list_of_dictionaries: list of dictionaries """ client = InfluxDBClient(host, port, user, password, dbname) try: client.write_points(data) return 0 except Exception as except_msg: logger.debug('Failed to write measurements to influxdb (Device ID: ' '{}). Data that was submitted for writing: {}. ' 'Exception: {}'.format(device_id, data, except_msg)) return 1开发者ID:rafatahmed,项目名称:Mycodo,代码行数:37,代码来源:daemonutils.py

注:本文中的influxdb.InfluxDBClient.write_points方法示例由纯净天空整理自Github/MSDocs等开源代码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。



【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有